package org.example.utils

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

import java.util.{Collections, Properties}

/**
 * 描述 kafka生产者，消费者api，主要用于spark监听器发送报警消息
 */
//noinspection ScalaDocUnknownTag
object KafkaUtil {

  /**
   * 获取Kafka生产者
   * @param brokerList broker信息
   * @return
   */
  def getKafkaProducer(brokerList: String): KafkaProducer[String, String] = {
    val properties = new Properties()
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) //key的序列化;
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) //value的序列化;
    properties.put(ProducerConfig.ACKS_CONFIG, "1") //消息反馈;
    val producer4Kafka = new KafkaProducer[String, String](properties)
    producer4Kafka
  }

  /**
   * 获取Kafka的消费者
   * @param brokerList broker信息
   * @param topic topic信息
   * @param consumerGroupId 消费组组
   * @return
   */
  def getKafkaConsumer(brokerList: String, topic: String, consumerGroupId: String): KafkaConsumer[String, String] = {
    val properties = new Properties()
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName) //key的序列化;
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName) //value的序列化;
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId) //指定groupid
    val consumer4Kafka = new KafkaConsumer[String, String](properties)
    consumer4Kafka.subscribe(Collections.singletonList(topic))
    consumer4Kafka
  }
  /**
   * 获取Kafka的消费者数据流（不带偏移量）
   * @param
   * @param topic topic信息
   * @param ssc
   * @param kafkaParam
   * @return
   */
  def getKafkaStream(topic: String, ssc: StreamingContext,kafkaParam:Map[String, Object]):
  InputDStream[ConsumerRecord[String, String]] = {
    val dStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam))
    dStream
  }

  /**
   * 获取Kafka的消费者数据流（带偏移量）
   * @param topic
   * @param ssc
   * @param offsets
   * @param kafkaParam
   * @return
   */
  def getKafkaStream(topic: String,ssc:StreamingContext,offsets:Map[TopicPartition,Long],kafkaParam:Map[String, Object])
  : InputDStream[ConsumerRecord[String,String]]={
    val dStream = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](Array(topic),kafkaParam,offsets))
    dStream
  }



}
